WebSocket+RabbitMQ实现消息推送系统 您所在的位置:网站首页 Redis labview WebSocket+RabbitMQ实现消息推送系统

WebSocket+RabbitMQ实现消息推送系统

#WebSocket+RabbitMQ实现消息推送系统| 来源: 网络整理| 查看: 265

前言:这里RabbitMQ的作用是进行流量削峰,减轻数据库的写压力和WebSocket的消息推送压力。如果你想增加系统的吞吐量,可以使用高吞吐量的RocketMQ或者Kafka代替RabbitMQ。

WebSocket源代码:WebSocket服务端消息推送

一、消息推送系统的重点问题 1.1、用户获取新的消息通知有两种模式

向指定用户发送WebSocket消息并处理对方不在线的情况:

如果接收者在线,系统直接推送消息给用户;

否则将消息存储到Redis,等用户上线后主动拉取未读消息。

1.2、为什么引入RabbitMQ

在本文的消息通知系统中,用户的通知消息和新通知提醒数据都放在数据库中,数据库的读写操作频繁。如果消息量大,DB压力较大,可能出现数据库瓶颈。而且业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力。这时候就可以引入消息队列RabbitMQ进行流量削峰。

当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列作为抽象层,弥合双方的差异。利用消息中间件转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。

试想上下游对于事情的处理能力是不同的。比如,Web接口服务器每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等即可。但数据库的处理能力却十分有限,即使使用SSD加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的机器数量追上Web接口服务器。

这种问题同样存在于系统和系统之间,如短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,一般是不会有太大问题的。如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用消息中间件转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。

二、WebSocket+RabbitMQ消息推送架构图 2.1、功能架构图 

 一般还需要在「个人中心」需要有一个设置是否接收消息的设置项,满足用户个性化需求。

我们当前的流程有些取巧,原本应该是消费者发消息之前就去请求「用户消息设置」,用户设置成接收,才去产生消息的。而我们现在的流程中消费者不去关注用户设置,把所有消息都往「队列」里塞,让主流程去做过滤处理,这样各个生产者就不用每个都去单独处理,同时也少了一次网络交互。

2.2、技术架构图

从图中可以看出消息通知系统的基本流程是客户端A请求生产者服务端核心模块,核心模块生产一条消息到消息队列,然后消费者服务端消息模块消费消息,消费完之后就将消息推送给客户端B,流程很简单,没有太多技巧,唯一的巧妙之处就在消息模块这边的处理上。

三、消息通知的类型

几乎每个站点都有消息通知系统,可见通知系统的重要性不言而喻。通知系统看似简单,实际上比较复杂。那么本篇主要讲解常见的消息通知系统的设计和具体实现,包括数据库设计、逻辑关系分析等。

常见的站内通知类别:

公告 Announcement

提醒 Remind

资源订阅提醒「我关注的资源有更新、评论等事件时通知我」

资源发布提醒「我发布的资源有评论、收藏等事件时通知我」

系统提醒「平台会根据一些算法、规则等可能会对你的资源做一些事情,这时你会收到系统通知」

私信 Mailbox

以上三种消息有各自特点,实现也各不相同,其中「提醒」类通知是最复杂的:

通知事件:

通知事件就是当用户在网站或应用上产生了支付行为之后,如果你想给用户一个通知,告诉她系统已收到她的付款,那么你就要把这个「支付行为」定义为一个通知事件,并且保存这个通知事件到「通知事件表」里,以便通知系统作异步处理。通知系统会不断的处理「通知事件表」里的数据,分析每一个事件应该通知和不通知哪些人。

通知事件表「notify_event」

记录每一个用户行为产生的通知事件信息

表结构如下:

id: {type: 'integer', primaryKey: true, autoIncrement:true} userID: {type: 'string', required: true} //用户ID action: {type: 'string', required: true} //动作,如:捐款/更新/评论/收藏 objectID: {type: 'string', required: true}, //对象ID,如:文章ID; objectType: {type: 'string', required: true} //对象所属类型,如:人、文章、活动、视频等; createdAt:{type: 'timestamp', required: true} //创建时间;

用户行为定义

「action」即用户行为,如:赞了、评论了、喜欢了、捐款了、收藏了;一般来讲,我们把一个用户行为定义为一个通知类型,那么用户行为必须是需要提前定义好的。

由消息系统内部定义,为后台提供接口,用于通知设置。如下:

notify_action_type := ["donated","conllected","commented","updated"]

对象类型定义

「objectType」即用户行为作用的对象的所属类型,简单的说就是资源类型,如:项目、文章、评论、商品、视频、图片、用户。

由消息系统内部定义,为后台提供接口,用于通知设置。如下:

notify_object_type := ["project","comment"] 四、消息通知系统注意事项 4.1、Nginx代理webSocket时60s自动断开, 怎么保持长连接

利用nginx代理websocket的时候,发现客户端和服务器握手成功后,如果在60s时间内没有数据交互,连接就会自动断开。

nginx.conf 文件里location 中的proxy_read_timeout 默认60s断开,可以把他设置大一点,你可以设置成自己需要的时间,我这里设置的是十分钟(600s).

nginx配置如下:

server {         listen 80;         server_name carrefourzone.senguo.cc;         #error_page 502 /static/502.html;         location /static/ {             root /home/chenming/Carrefour/carrefour.senguo.cc/source;             expires 7d;             }         location / {             proxy_pass_header Server;             proxy_set_header Host $http_host;             proxy_redirect off;             proxy_set_header X-Real-IP $remote_addr;             proxy_set_header X-Scheme $scheme;             proxy_pass       http://127.0.0.1:9887;             proxy_http_version  1.1;             proxy_set_header    Upgrade    "websocket";             proxy_set_header    Connection "Upgrade";             proxy_read_timeout 600s;          }     }

按照上述方法设置好后,我们可以发现,如果在10分钟之内没有数据交互的话,websocket连接就会自动断开,所以这种方式还是有点问题,如果我页面停留时间超过十分钟而且又没有数据交互的话,连接还是会断开的,所以需要同时结合WebSocket心跳机制的方法.

4.2、WebSocket的心跳激活机制

心跳机制是每隔一段时间会向服务器发送一个数据包,告诉服务器自己还活着,同时客户端会确认服务器端是否还活着,如果还活着的话,就会回传一个数据包给客户端来确定服务器端也还活着,否则的话,有可能是网络断开连接了。需要重连。

WebSocket 长连接需要在弱网环境和网络暂时断开的情况下,需要有一个稳定的重连机制,保证在网络不稳定的时候,客户端和服务端能够重连,继续通信。

在nginx延长超时时间的基础上,前端在超时时间内发心跳包,刷新再读时间,前端具体实现见如下代码(此处代码包含了前端整个websocket的实现过程,重点关注心跳检测的内容):

// websocket连接 var websocket_connected_count = 0; var onclose_connected_count = 0; function newWebSocket(){ var websocket = null; // 判断当前环境是否支持websocket if(window.WebSocket){ if(!websocket){ var ws_url ="wss://"+domain+"/updatewebsocket"; websocket = new WebSocket(ws_url); } }else{ Tip("not support websocket"); } // 连接成功建立的回调方法 websocket.onopen = function(e){ heartCheck.reset().start(); // 成功建立连接后,重置心跳检测 Tip("connected successfully") } // 连接发生错误,连接错误时会继续尝试发起连接(尝试5次) websocket.onerror = function() { console.log("onerror连接发生错误") websocket_connected_count++; if(websocket_connected_count


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有